Buffer size can nog be configured with database.conf
authorJeroen van der Heijden <jeroen@transceptor.technology>
Thu, 4 Oct 2018 09:03:27 +0000 (11:03 +0200)
committerJeroen van der Heijden <jeroen@transceptor.technology>
Thu, 4 Oct 2018 09:03:27 +0000 (11:03 +0200)
include/siri/db/buffer.h
src/siri/admin/request.c
src/siri/db/buffer.c
src/siri/db/db.c

index 326c393a09d8059556a5025b1450937457634d60..db2b56dd30c2bdc335c733493ee96e609caff856 100644 (file)
@@ -41,7 +41,7 @@ int siridb_buffer_write_last_point(
 struct siridb_buffer_s
 {
     size_t size;            /* size for one series inside the buffer */
-    size_t nsize;           /* optional new size from database.conf */
+    size_t _to_size;        /* optional new size from database.conf */
     size_t len;             /* number of points allocated per series */
     char * template;        /* template for writing an empty buffer */
     char * path;            /* path where the buffer file is stored */
index 02f1710e00d8bbbadc9497296e38bc47b627105e..9f094bc1162338fd83d8318723296c7797a6faab 100644 (file)
 "# Alternative path to save the buffer file.\n" \
 "# In case you later plan to change this location you manually need to move\n" \
 "# the buffer file to the new location.\n" \
-"# path = <buffer_path>\n"
+"# path = <buffer_path>\n" \
+"\n" \
+"# Buffer size in bytes. This size must be a multiple of 512 with a maximum\n" \
+"# of 1048576 bytes. Be careful using large values since SiriDB will require\n" \
+"# memory based on this value. A value between 1024 and 32768 is recommended.\n" \
+"# size = 1024\n"
 
 #define CHECK_DBNAME_AND_CREATE_PATH                                        \
     pcre_exec_ret = pcre2_match(                                            \
index d1281a6c1231b4f78f7d1fb115a39d612a4d2bd8..2cf61269dd76e8f7035c182c64c98005910e7cd5 100644 (file)
@@ -17,6 +17,7 @@
 #include <siri/db/db.h>
 #include <siri/db/misc.h>
 #include <siri/db/shard.h>
+#include <siri/db/shards.h>
 #include <siri/siri.h>
 #include <stdio.h>
 #include <string.h>
@@ -36,6 +37,8 @@ static int buffer__use_empty(
         siridb_buffer_t * buffer,
         siridb_series_t * series);
 static void buffer__migrate_to_new(char * pt, size_t sz);
+static void buffer__init_template(char * template, size_t size);
+
 
 /* buffer__start cannot conflict with a series_id since id 0 is never used */
 static const uint32_t buffer__start = 0x00000000;
@@ -58,7 +61,7 @@ siridb_buffer_t * siridb_buffer_new(void)
     buffer->fd = 0;
     buffer->fp = NULL;
     buffer->len = 0;
-    buffer->nsize = 0;  /* 0 means no new size */
+    buffer->_to_size = 0;  /* 0 means no new size */
     buffer->path = NULL;
     buffer->size = 0;
     buffer->template = NULL;
@@ -231,28 +234,6 @@ int siridb_buffer_open(siridb_buffer_t * buffer)
     return rc;
 }
 
-static void buffer__migrate_to_new(char * pt, size_t sz)
-{
-    char * npt = pt;
-    char * end = pt + sz;
-    uint32_t series_id = *((uint32_t *) pt);
-    pt += sizeof(uint32_t);
-    size_t num = *((size_t *) pt);
-    pt += sizeof(size_t);
-
-    memcpy(npt, &buffer__start, sizeof(uint32_t));
-    npt += sizeof(uint32_t);
-    memcpy(npt, &series_id, sizeof(uint32_t));
-    npt += sizeof(uint32_t);
-    memmove(npt, pt, num * 16);
-    npt += num * 16;
-
-    for (; npt < end; npt += sizeof(uint64_t))
-    {
-        memcpy(npt, &buffer__end, sizeof(uint64_t));
-    }
-}
-
 /*
  * Returns 0 if successful or -1 in case of an error.
  * (signal might be raised)
@@ -262,10 +243,14 @@ int siridb_buffer_load(siridb_t * siridb)
     siridb_buffer_t * buffer = siridb->buffer;
     FILE * fp;
     FILE * fp_temp;
-    size_t read_at_once = (size_t) (MAX_BUFFER_SZ / buffer->size);
+    size_t cur_size = buffer->size;
+    size_t cur_len = cur_size / sizeof(siridb_point_t);
+    size_t new_size =  buffer->_to_size ? buffer->_to_size : cur_size;
+    size_t new_len = new_size / sizeof(siridb_point_t);
+    size_t read_at_once = (size_t) (MAX_BUFFER_SZ / cur_size);
+    size_t max_len = cur_len > new_len ? cur_len : new_len;
     size_t num, i;
-    char * buf;
-    char * pt, * end;
+    char * buf, * pt;
     long int offset = 0;
     siridb_series_t * series;
     _Bool log_migrate = 1;
@@ -274,24 +259,26 @@ int siridb_buffer_load(siridb_t * siridb)
 
     log_info("Loading and cleanup buffer");
 
-    buf = malloc(read_at_once * buffer->size);
-    buffer->template = malloc(buffer->size);
+    /* we can already set the new buffer size */
+    buffer->size = new_size;
+    buffer->len = new_len;
+
+    buf = malloc(read_at_once * cur_size);
+    buffer->template = malloc(new_size);
     if (buf == NULL || buffer->template == NULL)
     {
-        free(buf);
+        free(buf);  /* buffer->template will be cleaned */
         log_critical("Allocation error while loading buffer");
         return -1;
     }
 
-    for (   pt = buffer->template,
-            end = buffer->template + buffer->size;
-            pt < end;
-            pt += sizeof(uint64_t))
+    if (new_size != cur_size)
     {
-        memcpy(pt, &buffer__end, sizeof(uint64_t));
+        log_warning(
+                "Changing buffer size from %zu to %zu", cur_size, new_size);
     }
 
-    memcpy(buffer->template, &buffer__start, sizeof(uint32_t));
+    buffer__init_template(buffer->template, new_size);
 
     siridb_misc_get_fn(fn, buffer->path, SIRIDB_BUFFER_FN)
     siridb_misc_get_fn(fn_temp, buffer->path, "__" SIRIDB_BUFFER_FN)
@@ -325,11 +312,11 @@ int siridb_buffer_load(siridb_t * siridb)
         return -1;
     }
 
-    while ((num = fread(buf, buffer->size, read_at_once, fp)))
+    while ((num = fread(buf, cur_size, read_at_once, fp)))
     {
         for (i = 0; i < num; i++)
         {
-            pt = buf + i * buffer->size;
+            pt = buf + i * cur_size;
 
             buf_start = *((uint32_t *) pt);
             if (buf_start != buffer__start)
@@ -339,7 +326,7 @@ int siridb_buffer_load(siridb_t * siridb)
                     log_warning("Buffer will be migrated");
                     log_migrate = 0;
                 }
-                buffer__migrate_to_new(pt, buffer->size);
+                buffer__migrate_to_new(pt, cur_size);
             }
 
             pt += sizeof(uint32_t);
@@ -353,15 +340,12 @@ int siridb_buffer_load(siridb_t * siridb)
                 continue;
             }
 
-            series->buffer = siridb_points_new(buffer->len, series->tp);
+            series->buffer = siridb_points_new(max_len, series->tp);
             if (series->buffer == NULL)
             {
                 log_critical("Cannot allocate a buffer for series id %u",
                         series->id);
-                fclose(fp);
-                fclose(fp_temp);
-                free(buf);
-                return -1;  /* signal is raised */
+                goto failed;
             }
 
             series->bf_offset = offset;
@@ -372,26 +356,63 @@ int siridb_buffer_load(siridb_t * siridb)
                 siridb_points_add_point(series->buffer, ts, val);
             }
 
-            offset += buffer->size;
-
-            /* increment series->length which is 0 at this time */
+            offset += new_size;
             series->length += series->buffer->len;
 
+            pt = buf + i * cur_size;
+            if (new_size > cur_size)
+            {
+                memcpy(buffer->template, pt, cur_size);
+                pt = buffer->template;
+            }
+            else if (new_size < cur_size)
+            {
+                if (series->buffer->len >= new_len)
+                {
+                    if (siridb_shards_add_points(
+                            siridb,
+                            series,
+                            series->buffer))
+                    {
+                        log_critical("Error while sharding points");
+                        goto failed;
+                    }
+                    series->buffer->len = 0;
+                    memcpy(
+                            buffer->template + 4,
+                            &series->id,
+                            sizeof(uint32_t));
+                    pt = buffer->template;
+                }
+
+                if (siridb_points_resize(series->buffer, new_len))
+                {
+                    log_critical("Allocation error while resizing points");
+                    goto failed;
+                }
+            }
+
             /* write to output file and check if write was successful */
-            if ((fwrite(buf + i*buffer->size, buffer->size, 1, fp_temp) != 1))
+            if ((fwrite(pt, new_size, 1, fp_temp) != 1))
             {
                 log_critical("Could not write to temporary buffer file: '%s'",
                         fn_temp);
-                fclose(fp);
-                fclose(fp_temp);
-                free(buf);
-                return -1;
+                goto failed;
             }
         }
     }
 
-    free(buf);
+    if (new_size != cur_size)
+    {
+        if (siridb_save(siridb))
+        {
+            log_critical("Cannot save changes to SiriDB (database.dat)");
+            goto failed;
+        }
+        buffer__init_template(buffer->template, new_size);
+    }
 
+    free(buf);
     if (fclose(fp) ||
         fclose(fp_temp) ||
         rename(fn_temp, fn))
@@ -401,6 +422,12 @@ int siridb_buffer_load(siridb_t * siridb)
     }
 
     return 0;
+
+failed:
+    fclose(fp);
+    fclose(fp_temp);
+    free(buf);
+    return -1;
 }
 
 /*
@@ -486,3 +513,38 @@ static int buffer__create_new(
 
     return 0;
 }
+
+static void buffer__migrate_to_new(char * pt, size_t sz)
+{
+    char * npt = pt;
+    char * end = pt + sz;
+    uint32_t series_id = *((uint32_t *) pt);
+    pt += sizeof(uint32_t);
+    size_t num = *((size_t *) pt);
+    pt += sizeof(size_t);
+
+    memcpy(npt, &buffer__start, sizeof(uint32_t));
+    npt += sizeof(uint32_t);
+    memcpy(npt, &series_id, sizeof(uint32_t));
+    npt += sizeof(uint32_t);
+    memmove(npt, pt, num * 16);
+    npt += num * 16;
+
+    for (; npt < end; npt += sizeof(uint64_t))
+    {
+        memcpy(npt, &buffer__end, sizeof(uint64_t));
+    }
+}
+
+static void buffer__init_template(char * template, size_t size)
+{
+    char * pt, * end;
+    for (   pt = template,
+            end = template + size;
+            pt < end;
+            pt += sizeof(uint64_t))
+    {
+        memcpy(pt, &buffer__end, sizeof(uint64_t));
+    }
+    memcpy(template, &buffer__start, sizeof(uint32_t));
+}
index fc2d3b97d58281801019f0207b35a8d06f0ebf00..5923b14283d564277844e71ba9a991a8b9c71749 100644 (file)
@@ -385,9 +385,8 @@ static int siridb__from_unpacker(
         READ_DB_EXIT_WITH_ERROR("Cannot read buffer size.")
     }
 
-    /* bind buffer size and len to SiriDB */
+    /* bind buffer size to SiriDB */
     (*siridb)->buffer->size = (size_t) qp_obj.via.int64;
-    (*siridb)->buffer->len = (*siridb)->buffer->size / sizeof(siridb_point_t);
 
     /* read number duration  */
     if (qp_next(unpacker, &qp_obj) != QP_INT64)
@@ -849,6 +848,7 @@ static int siridb__read_conf(siridb_t * siridb)
 
     /* read buffer size from database.conf */
     rc = cfgparser_get_option(&option, cfgparser, "buffer", "size");
+
     if (rc == CFGPARSER_SUCCESS && option->tp == CFGPARSER_TP_INTEGER)
     {
         ssize_t ssize = option->val->integer;
@@ -862,11 +862,26 @@ static int siridb__read_conf(siridb_t * siridb)
         }
         else
         {
-            buffer->nsize = (buffer->size == (size_t) ssize) ?
+            buffer->_to_size = (buffer->size == (size_t) ssize) ?
                     0 : (size_t) ssize;
         }
     }
-
+    else
+    {
+        FILE * fp = fopen(buf, "a");
+        if (fp != NULL)
+        {
+            if (rc == CFGPARSER_ERR_SECTION_NOT_FOUND)
+            {
+                (void) fprintf(fp, "\n[buffer]\nsize = %zu\n", buffer->size);
+            }
+            else if (rc == CFGPARSER_ERR_OPTION_NOT_FOUND)
+            {
+                (void) fprintf(fp, "\nsize = %zu\n", buffer->size);
+            }
+            (void) fclose(fp);
+        }
+    }
     cfgparser_free(cfgparser);
 
     return (buffer->path == NULL) ? -1 : 0;